-
Notifications
You must be signed in to change notification settings - Fork 926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARTEMIS-5037: option to limit mirror propagation #5220
base: main
Are you sure you want to change the base?
Conversation
...ol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
Outdated
Show resolved
Hide resolved
...e/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
@tabish121 Thanks for the review. FYI this is in a very drafty state ATM. Many refactors might come after that, in any case I'll make sure to take your notes in. |
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
...e/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
Thanks @gemmellr for the review! There are still things that are not working with the current PR and some refactors in the added lines might happen. In any case I'll make sure to take your comments in. |
343dcc0
to
73fbd27
Compare
...java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
I've exposed the new |
...ol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
Outdated
Show resolved
Hide resolved
...ol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
Outdated
Show resolved
Hide resolved
...ol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Show resolved
Hide resolved
...e/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPChainedReplicaTest.java
Outdated
Show resolved
Hide resolved
artemis-server/src/main/resources/schema/artemis-configuration.xsd
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
ad1161a
to
b2289ca
Compare
cbeb338
to
946cb19
Compare
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
Outdated
Show resolved
Hide resolved
artemis-server/src/main/resources/schema/artemis-configuration.xsd
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
...java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
8c33b97
to
d74aa05
Compare
.../src/main/java/org/apache/activemq/artemis/core/server/mirror/NoForwardMirrorController.java
Outdated
Show resolved
Hide resolved
artemis-server/src/main/resources/schema/artemis-configuration.xsd
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/activemq/artemis/core/server/mirror/NoForwardMirrorController.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/activemq/artemis/core/server/mirror/NoForwardMirrorController.java
Outdated
Show resolved
Hide resolved
...e/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
I have figured out something I don't know how to fix On this topology:
producing and consuming on 1 works, 2 mirrors, 3 is left alone producing on 1 and consuming on 2 breaks:
I don't know how to overcome that. Because if I decide to forward the Ack to one when I consume on two, then if the message is consumed on 1 it'll receive a duplicate Ack from 2. |
Yeah, if you say noForward, and dont provide a reverse mirror, you are not getting the ack. Thats basically as requested. there is no way to fix that without making the mirroring some kind of full topology mesh handling system with 'command forwarding between nodes without acting on them'. I'd consider that setup basically an invalid use of this already-pretty-questionable functionality. Ask for no forwarding -> get no forwarding. |
it makes sense to have some basic checks on the mirror, but it is also fine to have restrictions on the topologies that will work with a mirror and others that won't work. The most important thing imho is that it is understandable and deterministic |
921bdf8
to
7b146bd
Compare
My issue from two weeks ago was resolved by annotating the Ack with the ID of the intended receiver. And when the message is in the actual process of being sent a filtering procedure occurs to check if the actual receiver matches the ID stored in the ACK's annotation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gave it a quick skim, commented on some small niggles. Didnt fully consider if as yet, e.g if the internal-to-consumer filtering actually works ok (seems like there could be potential to throw off metrics, credit handling, acking, etc). There are some issues the checks build is pointing out (I might have commented on some of them, though expect there are others I didnt catch in the skim). I didn't get to the tests yet, will look tomorrow.
...ol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
Outdated
Show resolved
Hide resolved
9199ce1
to
210f382
Compare
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
Outdated
Show resolved
Hide resolved
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value | ||
* than the remoteMirrorID, false otherwise. | ||
*/ | ||
public boolean filterMessage(MessageReference ref) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whilst this may stop the message actually being sent, its not clear to me it does anything around actually cleaning it up? As far as the broker is concerned it sent a message, but the other end can never acknowledge it since it never actually arrives as it wasnt sent...so is it just stuck in the structure as a delivered-but-not-acked message? Presumably the snf queue delivering metrics would show this (and and be forever incorrect) if so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added another commit trying another approach at filtering by doing it at an higher level. Let me know what you think about this.
fe1ab30
to
5c39729
Compare
4e126f6
to
17dd899
Compare
...e/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
Outdated
Show resolved
Hide resolved
if (logger.isDebugEnabled()) { | ||
// TODO | ||
} | ||
return HandleStatus.NO_MATCH; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though different than the previous drop-inside-consumer approach, which also never actually sends the message, doing this instead still similarly means nothing evers consume the message. So its still just going to stay there forever (difference this time is, it wont be in a zombie-delivering state). Something will need to be done with the message.
...ava/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
Outdated
Show resolved
Hide resolved
...a/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java
Outdated
Show resolved
Hide resolved
try (Connection conn = factory.createConnection()) { | ||
Session session = conn.createSession(); | ||
conn.start(); | ||
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); | ||
assertNull(consumer.receiveNoWait()); | ||
consumer.close(); | ||
} | ||
|
||
try (Connection conn = factory2.createConnection()) { | ||
Session session = conn.createSession(); | ||
conn.start(); | ||
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); | ||
assertNull(consumer.receiveNoWait()); | ||
consumer.close(); | ||
} | ||
|
||
try (Connection conn = factory3.createConnection()) { | ||
Session session = conn.createSession(); | ||
conn.start(); | ||
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); | ||
assertNull(consumer.receiveNoWait()); | ||
consumer.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of repeated creating new connections to the same server. I'd be inclined to have a larger try-with-resources that created the connections and then [re]used them as needed.
Not necessarily for the whole test, but where it makes sense and makes things succinct and more efficient without any notable change in behaviour. E.g the 'consume from 1, from 2, from 3....then do the same again to check there is still nothing' all seems like it would be functionally the same without the extra connection creations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to do that, but that makes the test fails. I'll show you next week.
...java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPSquareMirroringTest.java
Outdated
Show resolved
Hide resolved
Add a new option in the Mirror settings to prevent a broker from propagating messages. On a topology such as: ``` 1 ---> 2 ---> 3 ^______| ``` Where 1 is connected to 2 via a noForward link, the behavior is as follows: * Every command from 1 are reaching 2 and are stopping at 2 not reaching 3. * If a message is produced on 1 and consumed on 2, the message is acknowledged on 1 and 2. No ack is reaching 3. * Every message produced on 2 are mirrored on 1 and 3. * If a message is produced on 2 and consumed on 1, it is acked on 1 2 and 3 * If a message is produced on 3 and consumed on 3, it is acked only on 3.
Add a new option in the Mirror settings to prevent a broker from
propagating messages.
On a topology such as:
Where 1 is connected to 2 via a noForward link, the behavior is as
follows:
3.
acknowledged on 1 and 2. No ack is reaching 3.
and 3
3.